package com.flow.framework.schedule.job;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.segments.MergeSegments;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.SerializationUtils;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.flow.framework.cache.constant.FrameworkCacheConstant;
import com.flow.framework.cache.service.cache.common.ICommonCacheService;
import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.type.TypeReference;
import com.flow.framework.common.util.verify.VerifyUtil;
import com.flow.framework.core.holder.SecurityContextHolder;
import com.flow.framework.core.holder.SystemVersionContextHolder;
import com.flow.framework.core.pojo.vo.PageModuleVo;
import com.flow.framework.core.properties.FrameworkCoreConfigProperties;
import com.flow.framework.core.system.thread.pool.factory.DefaultThreadFactory;
import com.flow.framework.facade.system.pojo.vo.SystemVersionModuleVo;
import com.flow.framework.lock.helper.LockHelper;
import com.flow.framework.lock.service.lock.ILockService;
import com.flow.framework.persistence.pojo.po.base.BaseBizPo;
import com.flow.framework.persistence.pojo.po.base.BaseSystemVersionPo;
import com.flow.framework.schedule.enumeration.FrameworkScheduleLockKeyEnum;
import com.flow.framework.schedule.pojo.bo.ScheduleRequiredFieldBo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
 * po分页调度辅助类
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/4/10
 */
@Slf4j
class ScheduleHelper {

    /**
     * 当前最大ID缓存的过期时间：30秒
     */
    private static final long MAX_ID_EXPIRE_TIME = 30000;

    /**
     * 当前最大ID缓存续期的间隔时间：5秒
     */
    private static final long MAX_ID_RENEW_TIME = 5000;

    @Data
    private static class PreprocessBo<E extends BaseSystemVersionPo> {

        private LambdaQueryWrapper<E> lambdaQueryWrapper;

        private int pageSize;

        private String cacheKey;
    }

    private static class ScheduledExecutorHolder {
        private static final ScheduledExecutorService SCHEDULED_EXECUTOR
                = new ScheduledThreadPoolExecutor(1,
                new DefaultThreadFactory("schedule_cache_max_id_"),
                new ThreadPoolExecutor.AbortPolicy());
    }


    private static <E extends BaseSystemVersionPo> void preprocess(PreprocessBo<E> preprocessBo,
                                                                   ScheduleRequiredFieldBo<E> scheduleRequiredFieldBo, String clazzName) {
        IPagePoScheduleHandler<E> pagePoScheduleHandler = scheduleRequiredFieldBo.getPagePoScheduleHandler();
        String params = scheduleRequiredFieldBo.getParams();
        LambdaQueryWrapper<E> lambdaQueryWrapper = getLambdaQueryWrapper(scheduleRequiredFieldBo, clazzName);
        if (null == lambdaQueryWrapper) {
            return;
        }
        int pageSize = pagePoScheduleHandler.getPageSize(params);
        List<String> cacheParams = new ArrayList<>();
        cacheParams.add(clazzName);
        String jobName = pagePoScheduleHandler.getJobName();
        if (!VerifyUtil.isEmpty(jobName)) {
            cacheParams.add(jobName);
        }
        if (!VerifyUtil.isEmpty(params)) {
            cacheParams.add(params);
        }
        String tenantId = SecurityContextHolder.getTenantIdQuietly();
        if (!VerifyUtil.isEmpty(tenantId)) {
            cacheParams.add(tenantId);
        }
        String cacheKey = String.join(FrameworkCacheConstant.CACHE_KEY_SPLIT_STRING, cacheParams);
        preprocessBo.setLambdaQueryWrapper(lambdaQueryWrapper);
        preprocessBo.setPageSize(pageSize);
        preprocessBo.setCacheKey(cacheKey);
    }

    /**
     * base system version po分页调度
     *
     * @param scheduleRequiredFieldBo scheduleRequiredFieldDo
     * @return 本次处理数据的条数
     */
    static <E extends BaseSystemVersionPo> long systemVersionPageSchedule(ScheduleRequiredFieldBo<E> scheduleRequiredFieldBo) {
        IPagePoScheduleHandler<E> pagePoScheduleHandler = scheduleRequiredFieldBo.getPagePoScheduleHandler();
        String clazzName = pagePoScheduleHandler.getClass().getSimpleName();
        PreprocessBo<E> preprocessBo = new PreprocessBo<>();
        preprocess(preprocessBo, scheduleRequiredFieldBo, clazzName);

        LambdaQueryWrapper<E> lambdaQueryWrapper = preprocessBo.getLambdaQueryWrapper();
        if (null == lambdaQueryWrapper) {
            return 0;
        }
        long dataCount = 0;
        int pageSize = preprocessBo.getPageSize();
        String jobName = pagePoScheduleHandler.getJobName();
        String cacheKey = preprocessBo.getCacheKey();

        String params = scheduleRequiredFieldBo.getParams();

        // 设置首次执行任务的最大id
        ICommonCacheService commonCacheService = scheduleRequiredFieldBo.getCommonCacheService();
        commonCacheService.setIfAbsent(0, cacheKey, pagePoScheduleHandler.getStartGreaterId(params), MAX_ID_EXPIRE_TIME,
                TimeUnit.MILLISECONDS);

        AtomicReference<ScheduledFuture<?>> scheduledFutureReference = new AtomicReference<>();
        try {
            ScheduledFuture<?> future = ScheduledExecutorHolder.SCHEDULED_EXECUTOR.scheduleAtFixedRate(() -> {
                        try {
                            commonCacheService.expire(0, cacheKey, MAX_ID_EXPIRE_TIME, TimeUnit.MILLISECONDS);
                        } catch (Exception e) {
                            log.warn("max id expire error. maybe job exec finished, job name : {}, clazz : {}", jobName, clazzName, e);
                            Optional.ofNullable(scheduledFutureReference.get()).ifPresent(scheduledFuture -> {
                                try {
                                    scheduledFuture.cancel(true);
                                } catch (Exception ignore) {
                                }
                            });
                        }
                    },
                    (int) (MAX_ID_RENEW_TIME * Math.random()), MAX_ID_RENEW_TIME, TimeUnit.MILLISECONDS);
            scheduledFutureReference.set(future);
            while (true) {
                IPage<E> page = getPage(preprocessBo, scheduleRequiredFieldBo);
                if (null == page) {
                    log.debug("result page is null, maybe job execute finished, clazz : {}", clazzName);
                    return dataCount;
                }
                log.debug("total count : {}, page size: {} clazz : {}", page.getTotal(), pageSize, clazzName);
                List<E> records = page.getRecords();
                int size = records.size();
                if (!VerifyUtil.isEmpty(records)) {
                    pagePoScheduleHandler.process(records, page.getCurrent(), page.getSize(), page.getTotal(), params);
                }
                dataCount += size;
                if (size < pageSize) {
                    break;
                }
            }
        } finally {
            try {
                scheduledFutureReference.get().cancel(true);
            } catch (Exception ignore) {
            }
        }
        return dataCount;
    }

    @SuppressWarnings("deprecation")
    private static <E extends BaseSystemVersionPo> IPage<E> getPage(PreprocessBo<E> preprocessBo,
                                                                    ScheduleRequiredFieldBo<E> scheduleRequiredFieldBo) {
        IPagePoScheduleHandler<E> pagePoScheduleHandler = scheduleRequiredFieldBo.getPagePoScheduleHandler();
        String clazzName = pagePoScheduleHandler.getClass().getSimpleName();
        LambdaQueryWrapper<E> lambdaQueryWrapper = preprocessBo.getLambdaQueryWrapper();
        if (null == lambdaQueryWrapper) {
            return null;
        }
        int pageSize = preprocessBo.getPageSize();
        String jobName = pagePoScheduleHandler.getJobName();
        String cacheKey = preprocessBo.getCacheKey();
        long queryTimeout = pagePoScheduleHandler.getQueryTimeout(scheduleRequiredFieldBo.getParams());
        ILockService lockService = LockHelper.getLockService();
        ICommonCacheService commonCacheService = scheduleRequiredFieldBo.getCommonCacheService();
        BaseMapper<E> mapper = scheduleRequiredFieldBo.getMapper();
        String lastMaxId = commonCacheService.get(0, cacheKey, new TypeReference<String>() {
        });
        if (VerifyUtil.isEmpty(lastMaxId)) {
            return null;
        }
        String lockKey = LockHelper.getLockKey(FrameworkScheduleLockKeyEnum.FRAMEWORK_SCHEDULE_EXEC_LOCK_KEY,
                Collections.singletonList(cacheKey));
        boolean lockSuccess = lockService.tryLock(lockKey, queryTimeout, -1);
        if (lockSuccess) {
            try {
                lastMaxId = commonCacheService.get(0, cacheKey, new TypeReference<String>() {
                });
                if (VerifyUtil.isEmpty(lastMaxId)) {
                    return null;
                }
                LambdaQueryWrapper<E> cloneLambdaQueryWrapper = SerializationUtils.clone(lambdaQueryWrapper);
                cloneLambdaQueryWrapper.gt(E::getId, lastMaxId);
                IPage<E> page = mapper.selectPage(new Page<E>().setSize(pageSize), cloneLambdaQueryWrapper);
                List<E> records = page.getRecords();
                if (VerifyUtil.isEmpty(records)) {
                    commonCacheService.delete(0, cacheKey);
                    return page;
                }
                int size = records.size();
                E po = records.get(size - 1);
                if (size < pageSize) {
                    log.debug("result page is null, maybe job execute finished, clazz : {}", clazzName);

                    // 这里必须删除缓存，避免数据库一直产生新数据，导致任务无法结束
                    commonCacheService.delete(0, cacheKey);
                } else {
                    commonCacheService.set(0, cacheKey, po.getId(), MAX_ID_EXPIRE_TIME, TimeUnit.MILLISECONDS);
                }
                return page;
            } catch (Exception e) {
                log.error("job page query error. clazz name : {}, job name : {}, max id : {}", clazzName, jobName,
                        lastMaxId, e);

                // 这里必须删除缓存，避免一直查询导致任务无法结束
                commonCacheService.delete(0, cacheKey);
                throw new CheckedException(SystemErrorCode.SCHEDULE_JOB_ERROR);
            } finally {
                lockService.unlock(lockKey);
            }
        } else {
            log.error("schedule job error, because get lock error. cache key : {}", cacheKey);
            throw new CheckedException(SystemErrorCode.SCHEDULE_JOB_ERROR);
        }
    }

    @Nullable
    private static <E extends BaseSystemVersionPo> LambdaQueryWrapper<E> getLambdaQueryWrapper(ScheduleRequiredFieldBo<E> scheduleRequiredFieldBo,
                                                                                               String handlerClazzName) {
        LambdaQueryWrapper<E> lambdaQueryWrapper = new LambdaQueryWrapper<E>();
        scheduleRequiredFieldBo.getPagePoScheduleHandler().buildLambdaQuery(scheduleRequiredFieldBo.getParams(), lambdaQueryWrapper);
        MergeSegments expression = lambdaQueryWrapper.getExpression();
        if (!VerifyUtil.isEmpty(expression.getOrderBy())) {
            log.error("query condition can't be customize order by. clazz : {}", handlerClazzName);
            throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
        }

        // 按照ID升序排列
        lambdaQueryWrapper.orderByAsc(E::getId);

        // 如果系统配置需要支撑环境编码，则需要处理系统版本（业务数据库中的数据是有版本号进行系统版本区分的）
        FrameworkCoreConfigProperties frameworkCoreConfigProperties = scheduleRequiredFieldBo.getFrameworkCoreConfigProperties();
        boolean enableEnvCodeSupport = frameworkCoreConfigProperties.isEnableEnvCodeSupport();
        if (enableEnvCodeSupport) {
            String envCode = frameworkCoreConfigProperties.getEnvCode();
            SystemVersionModuleVo systemVersion = scheduleRequiredFieldBo.getSystemFrameworkModuleService().getSystemVersion(envCode);
            if (null == systemVersion || null == systemVersion.getCurrentVersion()) {
                log.error("env code system version or current version can't be null. env code: {}", envCode);
                throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
            }
            SystemVersionContextHolder.setCurrentSystemVersion(systemVersion.getCurrentVersion());

            Long startVersion = systemVersion.getStartVersion();
            Long endVersion = systemVersion.getEndVersion();
            if (null == startVersion || null == endVersion) {
                log.error("get start version and end version error.");
                throw new CheckedException(SystemErrorCode.PARAMS_ERROR);
            }

            // 这里不能使用mybatis拦截器加上下文的方式，因为在自定义sql关联查询时会在每一个子查询中的设置值，导致新版版数据无法查到其他版本数据
            lambdaQueryWrapper.ge(E::getSystemVersion, startVersion);
            lambdaQueryWrapper.le(E::getSystemVersion, endVersion);
        }
        return lambdaQueryWrapper;
    }

    /**
     * biz po分页调度
     *
     * @param scheduleRequiredFieldBo scheduleRequiredFieldDo
     * @return 本次处理数据的条数
     */
    static <E extends BaseBizPo> long bizPoPageSchedule(ScheduleRequiredFieldBo<E> scheduleRequiredFieldBo) {
        boolean enableTenantSupport = scheduleRequiredFieldBo.getFrameworkCoreConfigProperties().isEnableTenantSupport();
        if (enableTenantSupport) {
            long totalDataCount = 0;
            long currentPage = 0;
            while (true) {
                PageModuleVo<String> pageModuleVo = scheduleRequiredFieldBo.getAuthorityFrameworkModuleService().paginationTenantIds(currentPage);
                List<String> tenantIds = pageModuleVo.getRecords();
                int size = tenantIds.size();
                for (String tenantId : tenantIds) {
                    try {
                        SecurityContextHolder.setTenantId(tenantId);
                        long dataCount = systemVersionPageSchedule(scheduleRequiredFieldBo);
                        totalDataCount += dataCount;
                    } finally {
                        SecurityContextHolder.clearTenantId();
                    }
                }
                if (size < pageModuleVo.getSize()) {
                    break;
                }
                currentPage++;
            }
            return totalDataCount;
        } else {
            return systemVersionPageSchedule(scheduleRequiredFieldBo);
        }
    }
}